iT邦幫忙

2024 iThome 鐵人賽

DAY 0
0
自我挑戰組

重新開始 elasticsearch 系列 第 20

2024 鐵人賽 Day21:ES as Vector DB II

  • 分享至 

  • xImage
  •  

今天目標要把一些 vector 存入 ES 內,供下一篇進行 KNN search 使用。

接下來的資料都會改用 TMDB 的電影資料,前面使用的 tweeter 資料如果不進行前處理(去掉網頁、hashtag 等),使用 KNN 進行 search 的效果會不太好;文本的前處理是一件非常花費時間和客制的工作,在這邊就容我避開吧。

TMDB 的 API 功能很多(需要註冊取得 api token),其中有一個 endpoint 是可以取得 db 中 top rated 的資料,接下來會抓取前 300 top rated 的電影清單,把電影的簡介(overview)欄位作為要轉換為 word vector 的目標。

一樣使用 python 得到資料:

def get_tmdb_movies(pages=1):
    data = []
    headers = {
        "accept": "application/json",
        "Authorization": f"Bearer {tmdb_token}"
    }
    for page in range(pages):
        url = f"https://api.themoviedb.org/3/movie/top_rated?language=zh-TW&page={page+1}"
        response = requests.get(url, headers=headers)
        if response.status_code == 200:
            data.extend(response.json()['results'])
            time.sleep(random.randint(1, 3))
        else:
            print(f"Error: {response.status_code}, {response.text}")
    return data

一個 page 會包含 20 筆資料,擷取一筆資料內容長這樣,其中 overview 就是我們要轉換成 word vector 的標的欄位:

{
  "adult": false,
  "backdrop_path": "/kXfqcdQKsToO0OUXHcrrNCHDBzO.jpg",
  "genre_ids": [
    18,
    80
  ],
  "id": 278,
  "original_language": "en",
  "original_title": "The Shawshank Redemption",
  "overview": "Imprisoned in the 1940s for the double murder of his wife and her lover, upstanding banker Andy Dufresne begins a new life at the Shawshank prison, where he puts his accounting skills to work for an amoral warden. During his long stretch in prison, Dufresne comes to be admired by the other inmates -- including an older prisoner named Red -- for his integrity and unquenchable sense of hope.",
  "popularity": 126.062,
  "poster_path": "/9cqNxx0GxF0bflZmeSMuL5tnGzr.jpg",
  "release_date": "1994-09-23",
  "title": "The Shawshank Redemption",
  "video": false,
  "vote_average": 8.707,
  "vote_count": 26899
}

上一篇有提到轉換成 word vector 可以透過 LLM 來完成,這邊會使用 hugging face 提供的 inference api 來進行(需註冊取得 token,免費的帳號有限制轉換的額度),script 如下:

def embedding(model_id, texts):
    hf_token = os.getenv("HF_TOKEN")
    api_url = f"https://api-inference.huggingface.co/pipeline/feature-extraction/{model_id}"
    headers = {"Authorization": f"Bearer {hf_token}"}
    body = {"inputs": texts, "options":{"wait_for_model":True}}
    response = requests.post(api_url, headers=headers, json=body)
    if response.status_code != 200:
        print(f"Error: {response.status_code}, {response.text}")
        raise Exception("Error")
    return response.json()

這個 function 可以指定要使用的 model 是哪一個,目前是選用 sentence-transformers/all-MiniLM-L6-v2

接下來我們根據資料的內容設計一下 index mapping,其中要放置 vector 的欄位名稱為 overview_vector ,除了指定 type 之外,只有額外設定 dimension length 為 384。

{
    "properties": {
    "adult": {"type": "boolean"},
    "backdrop_path": {"type": "keyword"},
    "genre_ids": {"type": "integer"},
    "id": {"type": "integer"},
    "original_language": {"type": "keyword"},
    "original_title": {"type": "text", "fields": {"keyword": {"type": "keyword"}}},
    "overview": {"type": "text"},
    "overview_vector": {"type": "dense_vector", "dims": 384},
    "popularity": {"type": "float"},
    "poster_path": {"type": "keyword"},
    "release_date": {"type": "date"},
    "title": {"type": "text", "fields": {"kw": {"type": "keyword"}}},
    "video": {"type": "boolean"},
    "vote_average": {"type": "float"},
    "vote_count": {"type": "integer"}
    }
}

接下來把資料進行轉換成符合 bulk insert 的格式,內有一些從檔案讀取的部分是在測試中避免錯誤重跑一直重複取得 word vector,浪費 hugging face api 的額度和等待時間。

def data_to_es(movied_data):
    existed_ids = {}
    cached_data = []
    if os.path.exists("es_data.json"):
        with open("es_data.json", "r") as f:
            for line in f:
                data = json.loads(line)
                existed_ids.update({data["_id"]: data})
                

    for movie in movied_data:
        if not movie['id'] in existed_ids:
            movie_description = movie['overview']
            embeddings = embedding(model_id, movie_description)
            movie['overview_vector'] = embeddings
            movie["_id"] = movie.pop('id')
            movie["_index"] = index_name
            with open("es_data.json", "a") as f:
                json.dump(movie, f)
                f.write("\n")
        else:
            movie = existed_ids[movie['id']]
        yield movie

全部的程式碼組合起來加上 process flow 如下(token 的部份請自行註冊服務取得):

import requests
import os
import json
import random
import time
import elasticsearch
from elasticsearch import helpers

def get_tmdb_movies(pages=1):
    data = []
    headers = {
        "accept": "application/json",
        "Authorization": f"Bearer {tmdb_token}"
    }
    for page in range(pages):
        url = f"https://api.themoviedb.org/3/movie/top_rated?language=zh-TW&page={page+1}"
        response = requests.get(url, headers=headers)
        if response.status_code == 200:
            data.extend(response.json()['results'])
            time.sleep(random.randint(1, 3))
        else:
            print(f"Error: {response.status_code}, {response.text}")
    return data

def embedding(model_id, texts):
    hf_token = os.getenv("HF_TOKEN")
    api_url = f"https://api-inference.huggingface.co/pipeline/feature-extraction/{model_id}"
    headers = {"Authorization": f"Bearer {hf_token}"}
    body = {"inputs": texts, "options":{"wait_for_model":True}}
    response = requests.post(api_url, headers=headers, json=body)
    if response.status_code != 200:
        print(f"Error: {response.status_code}, {response.text}")
        raise Exception("Error")
    return response.json()

def data_to_es(movied_data):
    existed_ids = {}
    cached_data = []
    if os.path.exists("es_data.json"):
        with open("es_data.json", "r") as f:
            for line in f:
                data = json.loads(line)
                existed_ids.update({data["_id"]: data})
                

    for movie in movied_data:
        if not movie['id'] in existed_ids:
            movie_description = movie['overview']
            embeddings = embedding(model_id, movie_description)
            movie['overview_vector'] = embeddings
            movie["_id"] = movie.pop('id')
            movie["_index"] = index_name
            with open("es_data.json", "a") as f:
                json.dump(movie, f)
                f.write("\n")
        else:
            movie = existed_ids[movie['id']]
        yield movie

if __name__ == "__main__":
    st = time.time()
    model_id = "sentence-transformers/all-MiniLM-L6-v2"
    
    es_cli = elasticsearch.Elasticsearch("http://localhost:9200")
    index_name = 'tmdb_top_rated_movies'

    # read_index_mapping_file
    with open("index_mapping_movies.json", "r") as f:
        index_mapping = json.load(f)
    
    # create index
    # es_cli.indices.delete(index=index_name)
    if not es_cli.indices.exists(index=index_name):
        es_cli.indices.create(index=index_name, mappings=index_mapping)
    
    # 500 top rated movies, sourc:TMDB
    # update data with embeddings
    pages = 15
    print("get movie data from tmdb")
    if not os.path.exists("movie_data.json"):
        movied_data = get_tmdb_movies(pages=pages)
        print("write movie data from tmdb")
        with open("movie_data.json", "w") as f:
            json.dump(movied_data, f)
    else:
        with open("movie_data.json", "r") as f:
            movied_data = json.load(f)
    print("index movie data to es")
    helpers.bulk(es_cli, data_to_es(movied_data))
    ed = time.time()
    print(ed-st)

如果第一次執行,可能會需要個 10 ~ 20 分鐘左右,結束後到 kibana dev tool 看一下成果:

https://ithelp.ithome.com.tw/upload/images/20241006/201694485BuNacLAbe.png

下一篇讓我們開始 KNN search 吧~


上一篇
2024 鐵人賽 Day20: ES as Vector DB I
下一篇
2024 鐵人賽 Day22:ES as Vector DB II
系列文
重新開始 elasticsearch 29
圖片
  直播研討會
圖片
{{ item.channelVendor }} {{ item.webinarstarted }} |
{{ formatDate(item.duration) }}
直播中

尚未有邦友留言

立即登入留言